#!/usr/bin/env python3
# Copyright (C) 2025 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import print_function
import argparse
import fnmatch
import os
import re
import subprocess
import sys
import time
from typing import List, Optional, Tuple, Iterable

# The script can be executed as a git hook symlink. Using `git` to find the
# repo root is more resilient for worktree usage as a git hook.
try:
  REPO_ROOT = subprocess.check_output(['git', 'rev-parse', '--show-toplevel'],
                                      text=True,
                                      stderr=subprocess.PIPE).strip()
except (subprocess.CalledProcessError, FileNotFoundError) as e:
  print(
      f"Error: Could not determine repository root via git. {e}",
      file=sys.stderr)
  sys.exit(1)

IS_WIN = sys.platform.startswith('win')


def RunAndReportIfLong(func, *args, **kargs):
  """Runs function, reports runtime if it exceeds a limit."""
  start = time.time()
  results = func(*args, **kargs)
  end = time.time()
  limit = 3.0  # seconds
  name = func.__name__
  runtime = end - start
  if runtime > limit:
    print(f"{name} took >{limit:.2f}s ({runtime:.2f}s)", file=sys.stderr)
  return results


def run_command(args: List[str],
                check: bool = True,
                **kwargs) -> subprocess.CompletedProcess:
  """Helper to run a subprocess. Handles capture_output conflict."""
  # Decide whether to use capture_output based on kwargs
  use_capture_output = 'stdout' not in kwargs and 'stderr' not in kwargs
  try:
    if use_capture_output:
      # Default case: capture stdout/stderr using capture_output
      return subprocess.run(
          args, capture_output=True, text=True, check=check, **kwargs)
    else:
      # Case where stdout or stderr is explicitly provided (e.g., DEVNULL)
      # Don't use capture_output=True. Ensure text=True if not overridden.
      kwargs.setdefault('text', True)
      # Filter out capture_output if it was somehow passed in kwargs
      kwargs.pop('capture_output', None)
      return subprocess.run(args, check=check, **kwargs)
  except FileNotFoundError:
    # Specific handling for command not found
    print(
        f"Error: Command '{args[0]}' not found. Please ensure it's installed "
        f"and in your PATH.",
        file=sys.stderr)
    sys.exit(1)
  except subprocess.CalledProcessError as e:
    # Reraise for caller to handle, or handle more specifically if needed
    raise e


def get_changed_files(merge_base: Optional[str]) -> List[str]:
  """Gets list of files changed since merge_base, or all relevant files if
  merge_base is None."""
  try:
    if merge_base:
      # Status checks modified/added files, diff checks files changed since
      # merge-base
      status_result = run_command(['git', 'status', '--porcelain'])
      # Ignore deleted
      staged_or_modified = [
          line.split()[-1]
          for line in status_result.stdout.splitlines()
          if not line.startswith(' D')
      ]

      # Find the merge-base between merge_base and HEAD to avoid picking up
      # other changes if merge_base has moved on.
      try:
        merge_base_result = run_command(
            ['git', 'merge-base', merge_base, 'HEAD'])
        merge_base_commit = merge_base_result.stdout.strip()
      except Exception as _:
        # On the CI, the merge base cannot be found for PRs from external
        # contributors since we do a shallow clone. In that case, use the
        # `merge_base` directly.
        merge_base_commit = merge_base

      diff_result = run_command([
          'git', 'diff', '--name-only', '--diff-filter=crd', merge_base_commit,
          'HEAD'
      ])
      diff_files = diff_result.stdout.splitlines()

      # Combine and make unique, relative to repo root
      all_changed = set(staged_or_modified) | set(diff_files)

      # Normalize paths
      return sorted([os.path.normpath(f) for f in all_changed])
    else:
      # No merge base, get all tracked files
      result = run_command(['git', 'ls-files'])
      return sorted([os.path.normpath(f) for f in result.stdout.splitlines()])
  except (subprocess.CalledProcessError, FileNotFoundError) as e:
    print(f"Error getting changed files: {e}", file=sys.stderr)
    sys.exit(1)


def filter_files(files: List[str],
                 files_to_check: Optional[Iterable[str]] = None,
                 files_to_skip: Optional[Iterable[str]] = None) -> List[str]:
  """Filters a list of files based on include/exclude patterns (fnmatch)."""
  filtered = []
  # Use glob patterns (fnmatch) directly
  check_patterns = list(files_to_check) if files_to_check else [
      '*'
  ]  # Default: check all
  skip_patterns = list(files_to_skip) if files_to_skip else []

  # fnmatch generally works well with forward slashes even on Windows
  # No need to normalize patterns unless specific issues arise

  for f in files:
    # Use os.path.normpath for consistent separators in the file path being
    # checked
    norm_f = os.path.normpath(f)

    # Check skip patterns first
    if any(
        fnmatch.fnmatch(norm_f, pattern) or
        fnmatch.fnmatch(os.path.basename(norm_f), pattern)
        for pattern in skip_patterns):
      # Check against full path and just the filename for flexibility
      continue

    # Check include patterns
    if any(
        fnmatch.fnmatch(norm_f, pattern) or
        fnmatch.fnmatch(os.path.basename(norm_f), pattern)
        for pattern in check_patterns):
      filtered.append(f)  # Return original path 'f'

  return filtered


def read_file_content(filepath: str) -> Optional[List[str]]:
  """Reads a file's content, returns lines or None on error."""
  try:
    # Use absolute path for reading
    abs_path = os.path.join(REPO_ROOT, filepath)
    with open(abs_path, 'r', encoding='utf-8', errors='ignore') as f:
      return f.readlines()
  except FileNotFoundError:
    # This can happen if a file listed in diff was deleted or moved before
    # reading
    print(f"Warning: File not found during read {filepath}", file=sys.stderr)
    return None
  except Exception as e:
    print(f"Error reading file {filepath}: {e}", file=sys.stderr)
    return None


def get_git_file_content(ref: str, filepath: str) -> Optional[List[str]]:
  """Gets file content from a specific git ref."""
  try:
    # Use relative path for git show
    result = run_command(['git', 'show', f'{ref}:{filepath}'],
                         check=False)  # Don't fail if file didn't exist in ref
    if result.returncode == 0:
      return result.stdout.splitlines(keepends=True)
    return []  # File didn't exist or was deleted, treat as empty old content
  except (subprocess.CalledProcessError, FileNotFoundError) as e:
    print(
        f"Error getting git content for {ref}:{filepath}: {e}", file=sys.stderr)
    return None  # Indicate error


def get_presubmit_commit_message() -> str:
  """
  Gets the relevant commit message for presubmit testing.

  When GitHub CI is checking a pull request, HEAD is a temporary merge commit.
  In that scenario, we want the message from the pull request's actual branch,
  which is the second parent (HEAD^2). For local runs, this is not a merge
  commit, so we fall back to HEAD.
  """
  try:
    # Check if HEAD is a merge commit by seeing if it has a second parent. We
    # use --verify to ensure the ref exists, and check=False to handle the
    # failure gracefully when HEAD isn't a merge commit.
    proc = run_command(
        ['git', 'rev-parse', '--verify', 'HEAD^2'],
        check=False,
        stdout=subprocess.PIPE,  # So we can read the hash below.
        stderr=subprocess.DEVNULL)

    if proc.returncode == 0:
      # It's a merge commit. Get the message from the second parent.
      head_ref = proc.stdout.strip()
      print("Detected merge commit. Reading message from pull request HEAD "
            f"({head_ref[:8]}).")
    else:
      # Not a merge commit (e.g., local run). Get the message from HEAD.
      head_ref = 'HEAD'

    result = run_command(['git', 'log', '-1', '--pretty=%B', head_ref])
    return result.stdout

  except (subprocess.CalledProcessError, FileNotFoundError) as e:
    print(f"Error getting commit message: {e}", file=sys.stderr)
    return ""  # Return empty string on error


def CheckDoNotSubmit(changed_files: List[str]) -> List[str]:
  errors = []
  pattern = re.compile(r'DO NOT SUBMIT', re.IGNORECASE)
  files_to_check_list = filter_files(
      changed_files, files_to_skip=['tools/run_presubmit'])

  for f_path in files_to_check_list:
    content = read_file_content(f_path)
    if content:
      for i, line in enumerate(content):
        if pattern.search(line):
          errors.append(f"{f_path}:{i+1}: Found 'DO NOT SUBMIT'")
  return errors


def CheckChangeHasNoTabs(changed_files: List[str]) -> List[str]:
  errors = []
  # Basic filter: skip common binary/data files where tabs might be ok,
  #               c++ files are already handled by clang-format.
  files_to_check_list = filter_files(
      changed_files,
      files_to_skip=[
          '*.cc',
          '*.h',
          '*.md',
          '*.pb',
          '*.png',
          '*.jpg',
          '*/test/data/*',
          'Makefile',
          'rules',
          '*.descriptor',
      ])

  for f_path in files_to_check_list:
    content = read_file_content(f_path)
    if content:
      for i, line in enumerate(content):
        if '\t' in line:
          errors.append(f"{f_path}:{i+1}: Found tab character")
          break  # Only report once per file
  return errors


def CheckCodeFormatted(merge_base: str, skip_formatters: str) -> List[str]:
  # NOTE: format-sources does its own invocation of git diff ... to detect
  # changed file. We use that, rather than passing the list here, to avoid
  # ending in a situation where run_presubmit fails, but then
  # tools/format_sources succeeds because they diverge in their diff logic.
  tool = os.path.join(REPO_ROOT, 'tools/format-sources')
  try:
    args = [
        tool,
        '--quiet',
        '--check-only',
        '--skip=' + skip_formatters,
        '--upstream=' + merge_base,
    ]
    run_command(args, stderr=subprocess.STDOUT)
  except subprocess.CalledProcessError:
    return [f'Please run {tool} to format sources.']
  return []


def CheckIncludeGuards(changed_files: List[str]) -> List[str]:
  if sys.platform == 'win32':
    return []
  tool_rel_path = 'tools/fix_include_guards'
  tool = os.path.join(REPO_ROOT, tool_rel_path)
  # Use fnmatch patterns
  files_to_check_list = filter_files(
      changed_files, files_to_check=['*.cc', '*.h', tool_rel_path])
  if not files_to_check_list:
    return []

  # Check if the tool itself changed
  tool_changed = tool_rel_path in [os.path.normpath(f) for f in changed_files]
  # Check if relevant source files changed
  source_files_changed = any(f != tool_rel_path for f in files_to_check_list)

  if not tool_changed and not source_files_changed:
    return []

  try:
    # Run the tool relative to the repo root
    run_command([sys.executable, tool, '--check-only'], cwd=REPO_ROOT)
  except FileNotFoundError:
    return [f"Tool not found: {tool}"]
  except subprocess.CalledProcessError:
    return [f'Please run python {tool} to fix include guards.']
  return []


def CheckIncludeViolations(changed_files: List[str]) -> List[str]:
  if sys.platform == 'win32':
    return []
  tool_rel_path = 'tools/check_include_violations'
  tool = os.path.join(REPO_ROOT, tool_rel_path)
  # Use fnmatch patterns
  files_to_check_list = filter_files(
      changed_files, files_to_check=['include/*.h', tool_rel_path])
  if not files_to_check_list:
    return []

  tool_changed = tool_rel_path in [os.path.normpath(f) for f in changed_files]
  source_files_changed = any(f != tool_rel_path for f in files_to_check_list)

  if not tool_changed and not source_files_changed:
    return []

  try:
    run_command([sys.executable, tool], cwd=REPO_ROOT)
  except FileNotFoundError:
    return [f"Tool not found: {tool}"]
  except subprocess.CalledProcessError:
    return [f'{tool} failed.']
  return []


def CheckIncludePaths(changed_files: List[str]) -> List[str]:
  # Uses fnmatch patterns
  files_to_check_list = filter_files(
      changed_files,
      files_to_check=['*.h', '*.cc'],
      files_to_skip=['examples/sdk/example.cc'])
  error_lines = []

  pattern = re.compile(r'^#include "(.*\.h)"')

  for f_path in files_to_check_list:
    content = read_file_content(f_path)
    if not content:
      continue

    for i, line in enumerate(content):
      match = pattern.search(line)
      if not match:
        continue
      if '// no-include-violation-check' in line:
        continue
      inc_hdr = match.group(1)
      # Normalize path separators for comparison
      norm_inc_hdr = os.path.normpath(inc_hdr)
      # Check if it starts with 'include/perfetto' or 'include\perfetto'
      if norm_inc_hdr.startswith(os.path.join('include', 'perfetto')):
        error_lines.append(
            f'  {f_path}:{i+1}: Redundant "include/" in #include path'
            f' "{inc_hdr}"')
      # Check for relative paths (no directory separator in the normalized path)
      # Exclude paths already starting with 'perfetto/' which is allowed
      # relative to include/
      has_no_os_sep = os.path.sep not in norm_inc_hdr
      has_no_fwd_slash = '/' not in norm_inc_hdr
      does_not_start_with_perfetto = not norm_inc_hdr.startswith('perfetto/')

      if has_no_os_sep and has_no_fwd_slash and does_not_start_with_perfetto:
        # Also allow includes relative to 'src/' for internal headers
        if not norm_inc_hdr.startswith('src/'):
          error_lines.append(
              f'  {f_path}:{i+1}: relative #include "{inc_hdr}" not allowed, '
              f'use full path from include dir (perfetto/...) or project root '
              f'(src/...) ')

  return [] if not error_lines else [
      'Invalid #include paths detected:\n' + '\n'.join(error_lines)
  ]


def CheckProtoComments(changed_files: List[str]) -> List[str]:
  if sys.platform == 'win32':
    return []
  tool_rel_path = 'tools/check_proto_comments'
  tool = os.path.join(REPO_ROOT, tool_rel_path)
  # Use fnmatch patterns
  files_to_check_list = filter_files(
      changed_files, files_to_check=['protos/perfetto/*.proto', tool_rel_path])
  if not files_to_check_list:
    return []

  tool_changed = tool_rel_path in [os.path.normpath(f) for f in changed_files]
  source_files_changed = any(f != tool_rel_path for f in files_to_check_list)

  if not tool_changed and not source_files_changed:
    return []

  try:
    run_command([sys.executable, tool], cwd=REPO_ROOT)
  except FileNotFoundError:
    return [f"Tool not found: {tool}"]
  except subprocess.CalledProcessError:
    return [f'{tool} failed']
  return []


def CheckBuild(changed_files: List[str]) -> List[str]:
  if sys.platform == 'win32':
    return []
  tool_rel_path = 'tools/gen_bazel'
  tool = os.path.join(REPO_ROOT, tool_rel_path)
  files_to_check_list = filter_files(
      changed_files,
      files_to_check=['*BUILD.gn', '*.gni', 'BUILD.extras', tool_rel_path])
  if not files_to_check_list:
    return []

  tool_changed = tool_rel_path in [os.path.normpath(f) for f in changed_files]
  source_files_changed = any(f != tool_rel_path for f in files_to_check_list)

  if not tool_changed and not source_files_changed:
    return []

  try:
    # Run the tool relative to the repo root
    run_command([sys.executable, tool, '--check-only'], cwd=REPO_ROOT)
  except FileNotFoundError:
    return [f"Tool not found: {tool}"]
  except subprocess.CalledProcessError:
    return [
        f'Bazel BUILD(s) are out of date. Run python {tool} to update them.'
    ]
  return []


def CheckAndroidBlueprint(changed_files: List[str]) -> List[str]:
  if sys.platform == 'win32':
    return []
  tool_rel_path = 'tools/gen_android_bp'
  tool = os.path.join(REPO_ROOT, tool_rel_path)
  # Use fnmatch patterns
  files_to_check_list = filter_files(
      changed_files,
      files_to_check=['*BUILD.gn', '*.gni', tool_rel_path],
      files_to_skip=['src/trace_processor/perfetto_sql/stdlib/chrome/BUILD.gn'])
  if not files_to_check_list:
    return []

  tool_changed = tool_rel_path in [os.path.normpath(f) for f in changed_files]
  source_files_changed = any(f != tool_rel_path for f in files_to_check_list)

  if not tool_changed and not source_files_changed:
    return []

  try:
    run_command([sys.executable, tool, '--check-only'], cwd=REPO_ROOT)
  except FileNotFoundError:
    return [f"Tool not found: {tool}"]
  except subprocess.CalledProcessError:
    return [
        f'Android build files are out of date. Run python {tool} to update '
        'them.'
    ]
  return []


def EnsureProtoBinaryExists():
  out_dir = os.path.join(REPO_ROOT, 'out', 'presubmits')
  protoc_path = os.path.join(out_dir, 'protoc') + ('.exe' if IS_WIN else '')
  if os.path.exists(protoc_path):
    return protoc_path
  gn = os.path.join(REPO_ROOT, 'tools', 'gn')
  ninja = os.path.join(REPO_ROOT, 'tools', 'ninja')
  gn_args = 'is_debug=false'
  subprocess.check_call(
      [sys.executable, gn, 'gen', f'--args={gn_args}', out_dir], cwd=REPO_ROOT)
  subprocess.check_call([sys.executable, ninja, '-C', out_dir, 'protoc'])
  return protoc_path


def CheckBinaryDescriptors(changed_files: List[str]) -> List[str]:
  if sys.platform == 'win32':
    return []
  tool_rel_path = 'tools/gen_binary_descriptors'
  tool = os.path.join(REPO_ROOT, tool_rel_path)
  # Use fnmatch patterns
  files_to_check_list = filter_files(
      changed_files,
      files_to_check=['protos/perfetto/*.proto', '*.h', tool_rel_path])
  if not files_to_check_list:
    return []

  tool_changed = tool_rel_path in [os.path.normpath(f) for f in changed_files]
  source_files_changed = any(f != tool_rel_path for f in files_to_check_list)

  if not tool_changed and not source_files_changed:
    return []

  protoc = EnsureProtoBinaryExists()
  try:
    run_command([sys.executable, tool, '--check-only', f'--protoc={protoc}'],
                cwd=REPO_ROOT)
  except FileNotFoundError:
    return [f"Tool not found: {tool}"]
  except subprocess.CalledProcessError:
    return [f'Please run python {tool} to update binary descriptors.']
  return []


def CheckMergedTraceConfigProto(changed_files: List[str]) -> List[str]:
  if sys.platform == 'win32':
    return []
  tool_rel_path = 'tools/gen_merged_protos'
  tool = os.path.join(REPO_ROOT, tool_rel_path)
  # Use fnmatch patterns
  files_to_check_list = filter_files(
      changed_files, files_to_check=['protos/perfetto/*.proto', tool_rel_path])
  if not files_to_check_list:
    return []

  tool_changed = tool_rel_path in [os.path.normpath(f) for f in changed_files]
  source_files_changed = any(f != tool_rel_path for f in files_to_check_list)

  if not tool_changed and not source_files_changed:
    return []

  try:
    run_command([sys.executable, tool, '--check-only'], cwd=REPO_ROOT)
  except FileNotFoundError:
    return [f"Tool not found: {tool}"]
  except subprocess.CalledProcessError:
    return [
        'perfetto_config.proto or perfetto_trace.proto is out of ' +
        f'date. Please run python {tool} to update it.'
    ]
  return []


def CheckProtoEventList(changed_files: List[str],
                        merge_base: Optional[str]) -> List[str]:
  target_file = 'src/tools/ftrace_proto_gen/event_list'
  norm_target_file = os.path.normpath(target_file)

  if norm_target_file not in [os.path.normpath(f) for f in changed_files]:
    return []

  if not merge_base:
    print(
        f"Warning: Cannot check {target_file} history accurately without "
        f"merge-base.",
        file=sys.stderr)
    return []  # Skip check if we don't have history baseline

  old_content = get_git_file_content(merge_base, target_file)
  new_content = read_file_content(target_file)

  if old_content is None or new_content is None:
    return [f"Error reading old or new content for {target_file}"]

  # Allow adding lines or replacing lines with 'removed...'
  errors = []
  # Use zip_longest to handle lines added/removed at the end
  from itertools import zip_longest
  for i, (old_line_raw,
          new_line_raw) in enumerate(zip_longest(old_content, new_content)):
    old_line = old_line_raw.rstrip('\n\r') if old_line_raw is not None else None
    new_line = new_line_raw.rstrip('\n\r') if new_line_raw is not None else None

    if old_line == new_line:
      continue
    elif old_line is None and new_line is not None:
      # Line added - this is okay
      continue
    elif old_line is not None and new_line is None:
      # Line removed - check if it should have been replaced
      if old_line.strip() and not old_line.strip().startswith(
          '#'):  # Ignore empty/comment lines removal
        errors.append(
            f"{target_file}:{i+1}: Line removed without being replaced by "
            f"'removed': '{old_line}'")
    elif old_line is not None and new_line is not None:
      # Line changed - check if replacement is valid
      if not new_line.strip().startswith('removed'):
        errors.append(
            f"{target_file}:{i+1}: Invalid change. Only appending or replacing "
            f"with 'removed' is allowed. Got '{new_line}' from '{old_line}'")

  if errors:
    return [
        f'{target_file} only has two supported changes: '
        f'appending a new line, and replacing a line content starting with '
        f'"removed".\n' + "\n".join(errors)
    ]

  return []


def CheckBannedCpp(changed_files: List[str]) -> List[str]:
  bad_cpp: List[Tuple[str, str]] = [
      (r'\bstd::stoi\b',
       'std::stoi throws exceptions prefer base::StringToInt32()'),
      (r'\bstd::stol\b',
       'std::stoull throws exceptions prefer base::StringToInt32()'),
      (r'\bstd::stoul\b',
       'std::stoull throws exceptions prefer base::StringToUint32()'),
      (r'\bstd::stoll\b',
       'std::stoull throws exceptions prefer base::StringToInt64()'),
      (r'\bstd::stoull\b',
       'std::stoull throws exceptions prefer base::StringToUint64()'),
      (r'\bstd::stof\b',
       'std::stof throws exceptions prefer base::StringToDouble()'),
      (r'\bstd::stod\b',
       'std::stod throws exceptions prefer base::StringToDouble()'),
      (r'\bstd::stold\b',
       'std::stold throws exceptions prefer base::StringToDouble()'),
      (r'\bstrncpy\b',
       'strncpy does not null-terminate if src > dst. Use base::StringCopy'),
      (r'[(=]\s*snprintf\(',
       'snprintf can return > dst_size. Use base::SprintfTrunc'),
      (r'//.*\bDNS\b',
       '// DNS (Do Not Ship) found. Did you mean to remove some testing code?'),
      (r'\bPERFETTO_EINTR\(close\(',
       'close(2) must not be retried on EINTR on Linux and other OSes '
       'that we run on, as the fd will be closed.'),
      (r'^#include <inttypes.h>', 'Use <cinttypes> rather than <inttypes.h>. ' +
       'See https://github.com/google/perfetto/issues/146'),
  ]

  # Use fnmatch patterns for filtering files
  files_to_check_list = filter_files(
      changed_files, files_to_check=['*.h', '*.cc'])
  errors = []
  comment_pattern = re.compile(r'^\s*//')  # Regex is fine here

  for f_path in files_to_check_list:
    content = read_file_content(f_path)
    if content:
      for i, line in enumerate(content):
        if comment_pattern.search(line):
          continue  # Skip comments
        for regex_str, message in bad_cpp:
          # Use re.search for checking content
          if re.search(regex_str, line):
            errors.append(f'Banned pattern:\n  {f_path}:{i+1}: {message}')
  return errors


def CheckSqlModules(changed_files: List[str]) -> List[str]:
  if sys.platform == 'win32':
    return []
  tool_rel_path = 'tools/check_sql_modules.py'
  tool = os.path.join(REPO_ROOT, tool_rel_path)
  # Use fnmatch patterns
  files_to_check_list = filter_files(
      changed_files,
      files_to_check=[
          'src/trace_processor/perfetto_sql/stdlib/*.sql', tool_rel_path
      ])
  if not files_to_check_list:
    return []

  tool_changed = tool_rel_path in [os.path.normpath(f) for f in changed_files]
  source_files_changed = any(f != tool_rel_path for f in files_to_check_list)

  if not tool_changed and not source_files_changed:
    return []

  try:
    run_command([sys.executable, tool, '--check-includes'], cwd=REPO_ROOT)
  except FileNotFoundError:
    return [f"Tool not found: {tool}"]
  except subprocess.CalledProcessError:
    return [f'{tool} failed']
  return []


def CheckSqlMetrics(changed_files: List[str]) -> List[str]:
  if sys.platform == 'win32':
    return []
  tool_rel_path = 'tools/check_sql_metrics.py'
  tool = os.path.join(REPO_ROOT, tool_rel_path)
  # Use fnmatch patterns
  files_to_check_list = filter_files(
      changed_files,
      files_to_check=['src/trace_processor/metrics/*.sql', tool_rel_path])
  if not files_to_check_list:
    return []

  tool_changed = tool_rel_path in [os.path.normpath(f) for f in changed_files]
  source_files_changed = any(f != tool_rel_path for f in files_to_check_list)

  if not tool_changed and not source_files_changed:
    return []

  try:
    run_command([sys.executable, tool], cwd=REPO_ROOT)
  except FileNotFoundError:
    return [f"Tool not found: {tool}"]
  except subprocess.CalledProcessError:
    return [f'{tool} failed']
  return []


def CheckTestData(_: List[str]) -> List[str]:
  tool_rel_path = 'tools/test_data'
  tool = os.path.join(REPO_ROOT, tool_rel_path)
  try:
    # Assume tool needs execution permissions or use sys.executable
    run_command([sys.executable, tool, 'status', '--quiet'], cwd=REPO_ROOT)
  except FileNotFoundError:
    # Try without sys.executable if it's meant to be directly executable
    try:
      run_command([tool, 'status', '--quiet'], cwd=REPO_ROOT)
    except FileNotFoundError:
      return [f"Tool not found: {tool}"]
    except subprocess.CalledProcessError:
      # Fall through to error message
      pass
    else:
      return []  # Succeeded without sys.executable
  except subprocess.CalledProcessError:
    # Fall through to error message
    pass
  else:  # If try block succeeded
    return []

  # If we reached here, it failed
  return [
      f'//test/data is out of sync. Run `{tool} status` for more.\n'
      f'If you rebaselined UI tests or added a new test trace, run: `{tool} '
      f'upload`.\n'
      f'Otherwise run `tools/install-build-deps` or `{tool} download '
      f'--overwrite` to sync local test_data'
  ]


def CheckChromeStdlib(changed_files: List[str]) -> List[str]:
  # Use fnmatch patterns
  stdlib_paths_patterns = ("src/trace_processor/perfetto_sql/stdlib/chrome/*",
                           "test/data/chrome/*",
                           "test/trace_processor/diff_tests/stdlib/chrome/*")

  chrome_stdlib_files = filter_files(
      changed_files, files_to_check=stdlib_paths_patterns)

  if not chrome_stdlib_files:
    return []

  # Check commit message for exceptions
  commit_message = get_presubmit_commit_message()
  # Use regex for more robust check in commit message
  if re.search(r'COPYBARA_IMPORT', commit_message, re.IGNORECASE):
    print(
        "INFO: COPYBARA_IMPORT detected, skipping CheckChromeStdlib.",
        file=sys.stderr)
    return []
  if re.search(r'CHROME_STDLIB_MANUAL_ROLL', commit_message, re.IGNORECASE):
    print(
        "INFO: CHROME_STDLIB_MANUAL_ROLL detected, skipping CheckChromeStdlib.",
        file=sys.stderr)
    return []

  # If files changed and no exception found, return error
  # Use relative paths from patterns for message
  paths_str = ', '.join(p.replace('*', '') for p in stdlib_paths_patterns)
  message = (
      f'Files under {paths_str} '
      'are rolled from the Chromium repository by a '
      'Copybara service.\nYou should not modify these in '
      'the Perfetto repository, please make your changes '
      'in Chromium instead.\n'
      'Affected files:\n' +
      "\n".join([f"  - {f}" for f in chrome_stdlib_files]) + "\n"
      'If you want to do a manual roll, you must specify '
      'CHROME_STDLIB_MANUAL_ROLL=<reason> in the CL description/commit message.'
  )
  return [message]


def CheckAmalgamatedPythonTools(changed_files: List[str]) -> List[str]:
  if sys.platform == 'win32':
    return []
  tool_rel_path = 'tools/gen_amalgamated_python_tools'
  tool = os.path.join(REPO_ROOT, tool_rel_path)
  # Use fnmatch patterns
  files_to_check_list = filter_files(
      changed_files, files_to_check=['python/*', tool_rel_path])
  if not files_to_check_list:
    return []

  tool_changed = tool_rel_path in [os.path.normpath(f) for f in changed_files]
  source_files_changed = any(f != tool_rel_path for f in files_to_check_list)

  if not tool_changed and not source_files_changed:
    return []

  try:
    run_command([sys.executable, tool, '--check-only'], cwd=REPO_ROOT)
  except FileNotFoundError:
    return [f"Tool not found: {tool}"]
  except subprocess.CalledProcessError:
    return [
        f'amalgamated python tools/ are out of date. Run python {tool} to '
        f'update them.'
    ]
  return []


def CheckAbsolutePathsInGn(changed_files: List[str]) -> List[str]:
  # Use fnmatch patterns for filtering
  files_to_check_list = filter_files(
      changed_files,
      files_to_check=['*.gni', '*.gn'],  # Simplified include
      files_to_skip=[
          '.gn',  # Skip root .gn file by name
          'gn/*',  # Skip files in root gn/ directory
          'buildtools/*',  # Skip files in root buildtools/ directory
      ])

  error_lines = []
  # Pattern to find "//" inside quotes. Regex is fine here.
  abs_path_pattern = re.compile(r'"//[^"]')
  nogncheck_pattern = re.compile(r'#\s*nogncheck', re.IGNORECASE)
  comment_pattern = re.compile(r'^\s*#')

  for f_path in files_to_check_list:
    content = read_file_content(f_path)
    if content:
      for i, line in enumerate(content):
        if nogncheck_pattern.search(line) or comment_pattern.match(line):
          continue  # Skip comments and '# nogncheck' lines
        if abs_path_pattern.search(line):
          error_lines.append(f'  {f_path}:{i+1}: {line.strip()}')

  if not error_lines:
    return []
  return [
      'Use relative paths in GN rather than absolute ("//..."): Check these '
      'lines:\n' + '\n'.join(error_lines)
  ]


def CheckUiImports(changed_files: List[str]) -> List[str]:
  tool_rel_path = 'tools/check_imports'
  tool = os.path.join(REPO_ROOT, tool_rel_path)
  # Use fnmatch patterns
  files_to_check_list = filter_files(
      changed_files, files_to_check=['ui/*', tool_rel_path])
  if not files_to_check_list:
    return []
  try:
    run_command([sys.executable, tool], cwd=REPO_ROOT, stderr=subprocess.STDOUT)
  except subprocess.CalledProcessError:
    return [f'Import violations detected in ui/ sources. See {tool_rel_path}']
  return []


def CheckUiRatchet(changed_files: List[str]) -> List[str]:
  tool_rel_path = 'tools/check_ratchet'
  tool = os.path.join(REPO_ROOT, tool_rel_path)
  # Use fnmatch patterns
  files_to_check_list = filter_files(
      changed_files, files_to_check=['ui/*', tool_rel_path])
  if not files_to_check_list:
    return []
  try:
    run_command([sys.executable, tool], cwd=REPO_ROOT, stderr=subprocess.STDOUT)
  except subprocess.CalledProcessError:
    return [f'Bad patterns detected in ui/ sources. See {tool_rel_path}']
  return []


def main():
  parser = argparse.ArgumentParser()
  parser.add_argument('--merge-base', default=None)
  parser.add_argument(
      '--skip-formatters',
      default='',
      help='Comma-separated list of code formatters to skip')
  # The pre-push hook passes extra cmdline arguments like 'origin git@...'.
  args, positional_args = parser.parse_known_args()

  # 1. Determine files to check.
  merge_base = args.merge_base
  if merge_base is None:
    remote = 'origin'
    if not sys.stdin.isatty() and len(positional_args) > 1:
      remote = positional_args[0]  # Get the remote fromt the git hook cmdline.
    merge_base = subprocess.check_output(
        ['git', 'merge-base', 'HEAD', remote + '/main'], text=True).strip()

  changed_files = get_changed_files(merge_base)

  if not changed_files:
    print("No changed files detected relative to upstream or in working dir.")
    sys.exit(0)

  print(f"Checking {len(changed_files)} files...")  # Optional verbosity

  # 3. Run all checks
  all_results = []

  # Use RunAndReportIfLong for all checks
  checks_to_run = [
      (CheckCodeFormatted, [merge_base, args.skip_formatters]),
      (CheckDoNotSubmit, [changed_files]),
      (CheckChangeHasNoTabs, [changed_files]),
      (CheckIncludeGuards, [changed_files]),
      (CheckIncludeViolations, [changed_files]),
      (CheckIncludePaths, [changed_files]),
      (CheckProtoComments, [changed_files]),
      (CheckBuild, [changed_files]),
      (CheckAndroidBlueprint, [changed_files]),
      (CheckBinaryDescriptors, [changed_files]),
      (CheckMergedTraceConfigProto, [changed_files]),
      (CheckProtoEventList, [changed_files, merge_base]),  # Needs merge_base
      (CheckBannedCpp, [changed_files]),
      (CheckSqlModules, [changed_files]),  # Includes --check-includes
      (CheckSqlMetrics, [changed_files]),
      (CheckTestData, [changed_files
                      ]),  # Doesn't need specific files list but pass anyway
      (CheckAmalgamatedPythonTools, [changed_files]),
      (CheckChromeStdlib, [changed_files]),  # Checks commit msg
      (CheckAbsolutePathsInGn, [changed_files]),
      (CheckUiImports, [changed_files]),
      (CheckUiRatchet, [changed_files]),
  ]

  for func, args in checks_to_run:
    all_results.extend(RunAndReportIfLong(func, *args))

  # 4. Report results
  if all_results:
    print("\n--- Presubmit Errors Found ---", file=sys.stderr)
    # Filter out potential None results if any check returns None on error
    for error in filter(None, all_results):
      print(f"- {error}", file=sys.stderr)
    print("\n--- Presubmit Failed ---", file=sys.stderr)
    sys.exit(1)
  else:
    print("\n--- Presubmit Succeeded ---")
    sys.exit(0)


if __name__ == '__main__':
  # Ensure we run from repo root for consistent paths
  os.chdir(REPO_ROOT)
  main()
